[EP]:Add MCCL all-to-all fallback for MACA EP#2592
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces MACA support and a PyTorch-based fallback implementation of the alltoall operation, including new CUDA kernels for fused packing, compacting, and reducing. The review feedback focuses on critical optimizations and safety improvements in the CUDA kernels, such as utilizing shared memory to reduce redundant global memory reads, adding bounds checks to prevent out-of-bounds accesses, and guarding kernel launches against empty inputs to avoid invalid configuration errors. Additionally, it is recommended to make the PyTorch alltoall fallback implementation stateless to prevent potential race conditions when multiple MoE layers are interleaved.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| int count = load_i32_words(rank_base + local_expert * fused_hidden); | ||
|
|
||
| int begin = 0; | ||
| for (int r = 0; r < src_rank; ++r) { | ||
| const nv_bfloat16* prev_rank_base = | ||
| recv_payload + r * fused_slots_per_rank * fused_hidden; | ||
| begin += load_i32_words(prev_rank_base + local_expert * fused_hidden); | ||
| } | ||
|
|
||
| if (m == 0 && threadIdx.x == 0) { | ||
| layout_range[local_expert * num_ranks + src_rank] = | ||
| (static_cast<int64_t>(begin) << 32) | static_cast<uint32_t>(count); | ||
| atomicAdd(recv_count + local_expert, count); | ||
| } | ||
|
|
||
| if (m >= count) return; | ||
|
|
||
| int src_begin = 0; | ||
| for (int e = 0; e < local_expert; ++e) | ||
| src_begin += load_i32_words(rank_base + e * fused_hidden); |
There was a problem hiding this comment.
In compact_dispatch_fused_kernel, the loop to compute begin and src_begin is executed redundantly by all 256 threads in the block. Since these values only depend on blockIdx.x (which is constant for all threads in the block), we can compute them once in thread 0 and share them via __shared__ memory. This significantly reduces redundant global memory reads and instruction overhead.
__shared__ int shared_count;
__shared__ int shared_begin;
__shared__ int shared_src_begin;
if (threadIdx.x == 0) {
int count = load_i32_words(rank_base + local_expert * fused_hidden);
shared_count = count;
int begin = 0;
for (int r = 0; r < src_rank; ++r) {
const nv_bfloat16* prev_rank_base =
recv_payload + r * fused_slots_per_rank * fused_hidden;
begin += load_i32_words(prev_rank_base + local_expert * fused_hidden);
}
shared_begin = begin;
int src_begin = 0;
for (int e = 0; e < local_expert; ++e)
src_begin += load_i32_words(rank_base + e * fused_hidden);
shared_src_begin = src_begin;
}
__syncthreads();
int count = shared_count;
int begin = shared_begin;
if (m == 0 && threadIdx.x == 0) {
layout_range[local_expert * num_ranks + src_rank] =
(static_cast<int64_t>(begin) << 32) | static_cast<uint32_t>(count);
atomicAdd(recv_count + local_expert, count);
}
if (m >= count) return;
int src_begin = shared_src_begin;
| int64_t expert = topk_idx[i]; | ||
| if (expert < 0) return; | ||
|
|
| count_dispatch_kernel<<<ceil_div(total, 256), 256, 0, stream>>>( | ||
| topk_idx.data_ptr<int64_t>(), counts_by_expert.data_ptr<int32_t>(), | ||
| num_tokens, num_topk, num_local_experts); |
There was a problem hiding this comment.
If total is 0 (e.g., empty batch or no tokens), launching count_dispatch_kernel with 0 blocks will result in a CUDA invalid configuration error. We should wrap the kernel launch in a check if (total > 0) to handle empty inputs gracefully.
if (total > 0) {
count_dispatch_kernel<<<ceil_div(total, 256), 256, 0, stream>>>(
topk_idx.data_ptr<int64_t>(), counts_by_expert.data_ptr<int32_t>(),
num_tokens, num_topk, num_local_experts);
}
| pack_dispatch_fused_kernel<<<total, 256, 0, stream>>>( | ||
| reinterpret_cast<const nv_bfloat16*>(x.data_ptr()), | ||
| topk_idx.data_ptr<int64_t>(), | ||
| expert_offsets.data_ptr<int32_t>(), counters.data_ptr<int32_t>(), | ||
| reinterpret_cast<nv_bfloat16*>(send_payload.data_ptr()), | ||
| send_route.data_ptr<int64_t>(), num_tokens, hidden, num_topk, | ||
| num_ranks, num_local_experts, max_messages_per_rank); |
There was a problem hiding this comment.
If total is 0 (e.g., empty batch or no tokens), launching pack_dispatch_fused_kernel with 0 blocks will result in a CUDA invalid configuration error. We should wrap the kernel launch in a check if (total > 0) to handle empty inputs gracefully.
if (total > 0) {
pack_dispatch_fused_kernel<<<total, 256, 0, stream>>>(
reinterpret_cast<const nv_bfloat16*>(x.data_ptr()),
topk_idx.data_ptr<int64_t>(),
expert_offsets.data_ptr<int32_t>(), counters.data_ptr<int32_t>(),
reinterpret_cast<nv_bfloat16*>(send_payload.data_ptr()),
send_route.data_ptr<int64_t>(), num_tokens, hidden, num_topk,
num_ranks, num_local_experts, max_messages_per_rank);
}
| compact_dispatch_fused_kernel<<<num_ranks * num_local_experts * | ||
| max_messages_per_rank, | ||
| 256, 0, stream>>>( | ||
| reinterpret_cast<const nv_bfloat16*>(recv_payload.data_ptr()), | ||
| layout_range.data_ptr<int64_t>(), recv_count.data_ptr<int32_t>(), | ||
| return_src_pos.data_ptr<int64_t>(), | ||
| reinterpret_cast<nv_bfloat16*>(packed_recv_x.data_ptr()), | ||
| packed_recv_src_info.data_ptr<int32_t>(), hidden, num_ranks, | ||
| num_local_experts, max_messages_per_rank, num_recv_slots); |
There was a problem hiding this comment.
If max_messages_per_rank is 0, launching compact_dispatch_fused_kernel with 0 blocks will result in a CUDA invalid configuration error. We should wrap the kernel launch in a check if (max_messages_per_rank > 0) to handle empty inputs gracefully.
if (max_messages_per_rank > 0) {
compact_dispatch_fused_kernel<<<num_ranks * num_local_experts *
max_messages_per_rank,
256, 0, stream>>>(
reinterpret_cast<const nv_bfloat16*>(recv_payload.data_ptr()),
layout_range.data_ptr<int64_t>(), recv_count.data_ptr<int32_t>(),
return_src_pos.data_ptr<int64_t>(),
reinterpret_cast<nv_bfloat16*>(packed_recv_x.data_ptr()),
packed_recv_src_info.data_ptr<int32_t>(), hidden, num_ranks,
num_local_experts, max_messages_per_rank, num_recv_slots);
}
| self._torch_alltoall_state = { | ||
| "send_route": send_route, | ||
| "return_src_pos": return_src_pos, | ||
| "num_tokens": num_tokens, | ||
| "num_topk": num_topk, | ||
| "max_messages_per_rank": max_messages_per_rank, | ||
| } |
There was a problem hiding this comment.
The current implementation of _torch_alltoall_routed_dispatch and _torch_alltoall_routed_combine uses a stateful dictionary self._torch_alltoall_state to store send_route and return_src_pos. This can lead to subtle bugs or race conditions if multiple MoE layers are executed concurrently or interleaved. We can make the implementation completely stateless by wrapping send_route and return_src_pos inside the src_info tuple returned by dispatch and passed to combine via handle.
0112980 to
187db4a
Compare
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
088f20b to
5c5c73e
Compare
5c5c73e to
10103df
Compare
| self.num_ep_buffer_bytes = num_ep_buffer_bytes | ||
| self.backend = self.group | ||
| # NIC auto-detection happens inside ep.Buffer via Topology::discover(). | ||
| _debug_init(self.rank, "before ep.Buffer") |
There was a problem hiding this comment.
Do we really need such debug messages?
There was a problem hiding this comment.
Removed. These messages were only used while diagnosing MACA initialization hangs and are not needed in the submitted path.
| if self.group_size == 1: | ||
| _debug_init(self.rank, "single-rank skip ipc handle export") | ||
| self._use_fallback = False | ||
| _debug_init(self.rank, "connect done fallback=False") | ||
| return |
There was a problem hiding this comment.
Why do we need to handle this special case?
There was a problem hiding this comment.
This is intentional. With a single rank there is no peer that can import an IPC handle, so exporting/gathering IPC handles is unnecessary. On MACA it also avoids an unnecessary driver IPC call during initialization. I added a comment to make this explicit.
| self.connect() | ||
|
|
||
| def _maca_phase_fence(self, send_event: Optional[Any] = None) -> None: | ||
| if not _USE_MACA or _MACA_PHASE_FENCE in {"", "0", "off", "none"}: |
There was a problem hiding this comment.
In what case do we need the maca phase fence flag? I'm afraid introducing new env vars may cause understanding burdens to users, so if you could explain that in advance, it would be very helpful.
There was a problem hiding this comment.
This was originally a diagnostic switch used to compare different MACA phase synchronization strategies while debugging the split SEND/RECV path. I agree it should not be exposed as a user-facing env var, so I removed the flag and diagnostic modes. The remaining fence is an internal MACA compatibility fence only.
| if not _USE_MACA or _MACA_PHASE_FENCE in {"", "0", "off", "none"}: | ||
| return | ||
|
|
||
| backend = dist.get_backend(self.group) |
There was a problem hiding this comment.
When do we use gloo instead of mooncake-cpu?
There was a problem hiding this comment.
We only handle Gloo because some smoke tests pass a Gloo process group, so the tiny fence token uses CPU there while EP payload still stays on the P2P fast path.
There was a problem hiding this comment.
Which smoke tests? I may be missing some context.
There was a problem hiding this comment.
This refers to the local MACA validation script scripts/metax/smoke_ep_p2p.py, where Gloo is only used to isolate and validate the EP P2P payload path, not to replace mooncake-cpu in production.
| @@ -1,4 +1,7 @@ | |||
| add_library(mooncake_ep ep_py.cpp mooncake_ep_buffer.cpp mooncake_ep_kernel.cu) | |||
| add_library(mooncake_ep | |||
There was a problem hiding this comment.
Better leave this file unchanged lol
UNIDY2002
left a comment
There was a problem hiding this comment.
Would you mind updating the PR title to fit the actual impl?
Also, I have a small comment.
| self, device: torch.device, dtype: torch.dtype = torch.int32 | ||
| ) -> torch.Tensor: | ||
| if not self._is_mooncake_backend(): | ||
| return torch.ones((self.group_size,), dtype=dtype, device=device) |
There was a problem hiding this comment.
I think we should raise a warning in this case, as running without mooncake-pg may lose the active_ranks features.
Description
This PR stabilizes the MACA Expert Parallelism (EP) runtime path on a
single-node MetaX C500 setup. The submitted path does not use the experimental
MCCL all-to-all helper. It keeps the existing EP dispatch/combine kernels and
uses the TransferEngine device P2P/IPC fast path for intra-node GPU-to-GPU
payload movement.
The main goal is to make the MACA EP path compile, initialize, dispatch, and
combine reliably while keeping the change surface limited to MACA-specific
compatibility code.
Key changes:
Current runtime behavior:
IBGDA unavailable, using P2P-only pathwhen IBGDA cannotbe initialized.
fallback=False,ibgda_disabled=True,fast=True.MOONCAKE_EP_MACA_ALLOW_NODE_P2P=1.Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-pg)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-common)mooncake-rl)Type of Change
How Has This Been Tested?
Build and static checks
Result: passed.
The EP extension was rebuilt in the MACA torch environment:
Result: passed.
EP smoke tests
2-GPU smoke:
Result:
4-GPU smoke with full node P2P enabled:
Result:
EP performance checks
The following measurements were collected on June 29, 2026 on a 4-GPU
MetaX C500 MACA node. Timing is max across ranks.
2 GPUs,
tokens=2048,hidden=7168,topk=2, Gloo control plane:4 GPUs,
tokens=2048,hidden=7168,topk=2,MOONCAKE_EP_MACA_ALLOW_NODE_P2P=1, Gloo control plane:4 GPUs,
tokens=2048,hidden=7168,topk=4,MOONCAKE_EP_MACA_ALLOW_NODE_P2P=1, Gloo control plane:Current Limitations
measurements, not GPU-initiated RDMA measurements.
close MetaXLink pairs. Full 4-GPU P2P testing requires
MOONCAKE_EP_MACA_ALLOW_NODE_P2P=1.Python fallback path. That fallback is not the performance target of this PR.
segfaulted during process-group teardown. Gloo control-plane runs completed
cleanly, so this PR treats PG teardown as a separate follow-up issue.
this submitted path.
Checklist
./scripts/code_format.shpre-commit run --all-filesand all hooks passAI Assistance Disclosure
AI assistance was used to inspect the MACA EP changes, summarize test results,
and draft this PR description.